Fork me on GitHub

【Java多线程】JUC集合 06. LinkedBlockingDeque

LinkedBlockingDeque

1. 前言

  • 双向链表实现的双向并发阻塞队列,可同时在队列的头和尾进行插入/删除操作
  • 默认大小Integer.MAX_VALUE,可设置容量,防止过度膨胀
  • 线程安全

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {

transient Node<E> first;//表头

transient Node<E> last;//表尾

/** Number of items in the deque */
private transient int count;

/** Maximum number of items in the deque */
private final int capacity;

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
}

//双向链表节点
static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
E item;

/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;

Node(E x) {
item = x;
}
}

UML类图:

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
  • 添加元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//添加到队列尾部
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
try {
return linkLast(node);
} finally {
lock.unlock();//释放锁
}
}

private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;//如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();//唤醒notEmpty上的等待线程
return true;
}
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E take() throws InterruptedException {
return takeFirst();//取出头部节点
}

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();//若队列为空,则等待
return x;
} finally {
lock.unlock();
}
}

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();//唤醒notFull上等待的线程
return item;
}
  • iterator()

提供两种遍历方式:正向遍历和反向遍历

1
2
3
4
5
6
7
8
9
10
11
/** Forward iterator */
private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}

/** Descending iterator */
private class DescendingItr extends AbstractItr {
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3503480.html